#!/usr/bin/env python3
# coding: utf-8
# Time: 2023-04-20 14:29:37

"""
解析项目中的测试套和用例信息，支持多种导出格式
用法：./extract_case.py
"""

import json
import os
import re
import sys
import subprocess
from typing import List

g_error_count = 0


def error(testsuite, testcase, *args, **kwargs):
    global g_error_count
    g_error_count += 1
    print("ERROR", testsuite, testcase, *args, **kwargs)


def is_testcase(file_path: str) -> bool:
    if not os.path.isfile(file_path):
        return False
    # 文件后缀名不符合，直接返回False
    if not (file_path.endswith(".c") or file_path.endswith(".sh") or file_path.endswith(".py")):
        return False
    line_count = 0
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f.readlines():
            line_count += 1
            if "@用例ID:" in line:
                return True
            # 有的文件可能很大，有很多行，关键字[@用例ID:]一般会出现在文件头部，因此限制一下读的文件行数
            if line_count > 100:
                return False
    return False


def is_testsuite(dir_path: str) -> bool:
    if not os.path.isdir(dir_path):
        return False
    if not os.path.islink(os.path.join(dir_path, 'tsuite')):
        return False
    if not os.path.isdir(os.path.join(dir_path, 'tst_common')):
        return False
    if not os.path.isdir(os.path.join(dir_path, 'testcase')):
        return False
    return True


# 在Python中执行shell命令
class Command:
    def __init__(self, command, cwd=None, timeout=None):
        self.command = command
        self.cwd = cwd
        self.timeout = timeout
        self.output_stdout = None
        self.output_stderr = None
        self.proc = None
        self.pid = 0
        self.return_code = 0

    def run(self, stdout=None, stderr=None):
        run_stdout = sys.stdout if stdout is None else stdout
        run_stderr = sys.stderr if stderr is None else stderr
        self.proc = subprocess.Popen(self.command, shell=True, encoding='utf-8', cwd=self.cwd,
                                     stdout=run_stdout, stderr=run_stderr)
        self.proc.communicate(timeout=self.timeout)
        self.pid = self.proc.pid
        self.return_code = self.proc.returncode
        run_stdout.flush()
        run_stderr.flush()
        return self.proc.returncode

    def run_quiet(self):
        self.proc = subprocess.Popen(self.command, shell=True, encoding='utf-8', cwd=self.cwd,
                                     stdout=subprocess.PIPE,
                                     stderr=subprocess.PIPE)
        (self.output_stdout, self.output_stderr) = self.proc.communicate(timeout=self.timeout)
        self.pid = self.proc.pid
        self.return_code = self.proc.returncode
        return self.proc.returncode


class TestCase:
    """
    测试用例初始化传入一个用例文件，通过解析用例文件内容实例化用例
    """

    # 普通用例属性，一个用例只有一个，文件中第一个属性生效
    COMMON_ATTR = ('用例ID', '用例名称', '用例级别', '用例类型', '自动化', '超时时间')
    # 可以有多个的用例属性
    MULTI_ATTR = ('用例标签', '扩展属性', '用例描述', '预置条件')
    # 用例ID唯一
    TESTCASE_ID = dict()

    def __init__(self, testsuite_path: str, testcase_file: str):
        self.attr_text = list()
        self.testsuite_path = testsuite_path
        self.testsuite_name = os.path.basename(self.testsuite_path)
        self.testcase_file = testcase_file
        # 用例属性中None表示没有默认值，必须要能从用例文件中提取到，其他使用默认值
        self.testcase_attr = {
            '用例文件': None,
            '用例ID': None,
            '用例名称': None,
            '用例级别': '3',
            '用例标签': '',
            '扩展属性': '',
            '用例类型': '功能测试',
            '自动化': '1',
            '超时时间': '0',
            '用例描述': '',
            '预置条件': '',
            '测试步骤': None,
            '预期结果': None,
        }
        self.parse()

    @classmethod
    def check_id_unique(cls, testcase_id, testsuite_name, testcase_file):
        if testcase_id in cls.TESTCASE_ID:
            if testsuite_name == cls.TESTCASE_ID[testcase_id][0] and testcase_file == cls.TESTCASE_ID[testcase_id][1]:
                return True
            error(testsuite_name, testcase_file,
                  f'用例ID {testcase_id} 重复：{testsuite_name} {testcase_file} 和 '
                  f'{cls.TESTCASE_ID[testcase_id][0]} {cls.TESTCASE_ID[testcase_id][1]}')
            return False
        cls.TESTCASE_ID[testcase_id] = (testsuite_name, testcase_file)
        return True

    def check(self):
        """
        检查用例属性是否满足基本格式要求
        :return: 用例属性检查的错误个数，0表示用例属性格式无错误
        """
        value = self.testcase_attr.get('用例ID')
        if value is None or re.match(r'\d{8}-\d{6}-\d{9}', value) is None:
            error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                  f"用例ID {value} 格式错误，正确格式为：20220418-230037-838974137。"
                  f"推荐使用命令生成用例，其中用例ID自动生成：./tsuite new case [sh|c|py] case_name")

        value = self.testcase_attr.get('用例名称')
        if value is None or len(value) == 0:
            error(self.testsuite_name, self.testcase_attr.get('用例文件'), f"用例名称不能为空")

        value = self.testcase_attr.get('用例级别')
        if value is None or re.match(r'\d', value) is None or int(value) > 4 or int(value) < 0:
            error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                  f"用例级别必须为数字，且取值范围为[0-4]")

        value = self.testcase_attr.get('用例类型')
        valid_type = ('功能测试', '性能测试', '安全测试', '可靠性测试', '稳定性测试',
                      '兼容性测试', '界面体验测试', '其他')

        if value is None or len(value) == 0 or value not in valid_type:
            error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                  f"用例类型有效取值范围是：{'，'.join(valid_type)}")

        value = self.testcase_attr.get('自动化')
        if value is None or len(value) == 0 or re.match(r'\d', value) is None or int(value) not in (0, 1):
            error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                  f"自动化取值为0表示本用例为手动用例，1表示为自动化用例，其他值无效")

        value = self.testcase_attr.get('超时时间')
        if value is None or len(value) == 0 or re.match(r'\d+', value) is None or int(value) < 0:
            error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                  f"超时时间应为大于等于0的整数，单位为秒，0表示用例不设置超时时间")

        value = self.testcase_attr.get('测试步骤')
        if value is None or len(value) == 0:
            error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                  f"用例测试步骤不能为空")

        value = self.testcase_attr.get('预期结果')
        if value is None or len(value) == 0:
            error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                  f"用例预期结果不能为空")

    def _read_testcase_attrs(self):
        with open(self.testcase_file, encoding='utf-8') as f:
            for line in f.readlines():
                if re.search(r'@.*:', line):
                    self.attr_text.append(line)

    def _get_attr_common(self, attr: str) -> str:
        """
        获取指定属性的值
        :param attr: 需要获取的属性
        :return: 无有效值则返回None
        """
        for line in self.attr_text:
            if f'@{attr}:' not in line:
                continue
            return re.sub(f'.*@{attr}:', '', line).strip()

    def _get_attr_multi(self, attr: str) -> str:
        """
        获取属性的值，如果值有多个则组成tuple返回，值为空会被忽略
        :param attr:
        :return: 无有效值则返回None
        """
        attr_list = list()
        for line in self.attr_text:
            if f'@{attr}:' not in line:
                continue
            value = re.sub(f'.*@{attr}:', '', line).strip()
            if value:
                attr_list.append(value)
        return '\n'.join(attr_list) if attr_list else None

    def _get_step_expect(self):
        step = list()
        step_count = 0
        expect = list()
        # 检查测试步骤和预期结果是否匹配
        # 0 -- 表示初始状态
        # 1 -- 表示已经有测试步骤了
        # 2 -- 表示已经有预期结果了
        flag = 0
        for line in self.attr_text:
            if f'@测试步骤:' in line:
                step_count += 1
                step_value = re.sub(f'.*@测试步骤:([0-9]:)*', '', line).strip()
                if len(step_value) == 0:
                    error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                          f"用例文件中第 {step_count} 个测试步骤内容为空")
                step.append(f"步骤{step_count}：{step_value}")
                flag = 1
            if f'@预期结果:' in line:
                if flag == 0:
                    raise ValueError(f'{self.testcase_file}: 预期结果前没有测试步骤：{line}')
                if flag == 2:
                    raise ValueError(f'{self.testcase_file}: 预期结果不能连续出现：{line}')
                expect_value = re.sub(f'.*@预期结果:([0-9]:)*', '', line).strip()
                if len(expect_value) == 0:
                    error(self.testsuite_name, self.testcase_attr.get('用例文件'),
                          f"用例文件中第 中第 {step_count} 步的预期结果内容为空")
                expect.append(f"步骤{step_count}预期：{expect_value}")
                flag = 2
        return '\n'.join(step), '\n'.join(expect)

    def parse(self):
        """
        从用例文件里面解析用例属性信息
        :return:
        """
        if not os.path.isfile(self.testcase_file):
            raise FileNotFoundError(f'testcase file {self.testcase_file} not found')
        if not is_testcase(self.testcase_file):
            raise TypeError(f'file {self.testcase_file} not testcase')
        self._read_testcase_attrs()
        self.testcase_attr['用例文件'] = os.path.realpath(self.testcase_file).replace(
            os.path.realpath(self.testsuite_path), '', 1).lstrip('/')
        for attr in TestCase.COMMON_ATTR:
            value = self._get_attr_common(attr)
            if value:
                self.testcase_attr[attr] = value
        for attr in TestCase.MULTI_ATTR:
            value = self._get_attr_multi(attr)
            if value:
                self.testcase_attr[attr] = value
        step, expect = self._get_step_expect()
        self.testcase_attr['测试步骤'] = step
        self.testcase_attr['预期结果'] = expect
        self.check()

    def get_extern_attr(self, key: str) -> str:
        """
        从扩展属性中提取指定关键字的值
        :param key: 扩展属性中的关键字
        :return: 如果没找到则返回None
        """
        if f'{key}=' not in self.testcase_attr['扩展属性']:
            return ''
        regex = re.compile(rf"{key}='(?P<signal_quotes>.*?)'")
        match = regex.match(self.testcase_attr['扩展属性'])
        if match:
            return match.groupdict()['signal_quotes'].strip()
        regex = re.compile(rf'{key}="(?P<double_quotes>.*?)"')
        match = regex.match(self.testcase_attr['扩展属性'])
        if match:
            return match.groupdict()['double_quotes'].strip()
        regex = re.compile(rf"{key}=(?P<no_quotes>.*?)\s")
        match = regex.match(self.testcase_attr['扩展属性'])
        if match:
            return match.groupdict()['no_quotes'].strip()


class TestSuite:
    """
    测试套初始化传入测试套所在的目录，通过解析测试套内的信息进行实例化
    """

    def __init__(self, testsuite_path: str):
        self.testsuite_name = None
        self.testsuite_path = testsuite_path
        self.testcases: List[TestCase] = list()
        self.parse()

    def check(self):
        testcase_names = dict()
        for testcase in self.testcases:
            case_path = testcase.testcase_attr['用例文件']          
            testcase_name = os.path.join(testcase.testsuite_name, case_path).replace("/", "_")
            new_testcase_name = ".".join(testcase_name.split(".")[:-1])     
            if new_testcase_name in testcase_names:
                error(self.testsuite_name, testcase.testcase_attr.get('用例文件'),
                      f'用例名称 {new_testcase_name} 重复：{testcase.testcase_attr.get("用例文件")} 和 '
                      f'{testcase_names[testcase_name]}')
            else:
                testcase_names[testcase_name] = testcase.testcase_attr.get("用例文件")

    def parse(self):
        if not os.path.isdir(self.testsuite_path):
            raise NotADirectoryError(f'testsuite path {self.testsuite_path} not dir')
        if not is_testsuite(self.testsuite_path):
            raise TypeError(f'path {self.testsuite_path} not testsuite')
        self.testsuite_name = os.path.basename(self.testsuite_path)
        # 遍历测试套下的testcase目录
        for now_path, dirs, files in os.walk(os.path.join(self.testsuite_path, 'testcase')):
            for file_name in files:
                file_path = os.path.join(now_path, file_name)
                if not is_testcase(file_path):
                    continue
                self.testcases.append(TestCase(self.testsuite_path, file_path))
        self.check()


class TestProject:
    """
    一个测试项目对应一个git仓库，有的git仓库本身是一个测试套，有的git仓库可能有多个测试套
    """

    def __init__(self, project_path: str):
        self.project_name = None
        self.project_path = project_path
        self.testsuites: List[TestSuite] = list()
        self.is_project_one_suite = False
        self.parse()
        cmd = ""
        cmd = Command(f"cd {self.project_path} && git ls-remote --get-url origin")
        cmd.run_quiet()
        self.git_root = cmd.output_stdout

    def parse(self):
        if not os.path.isdir(self.project_path):
            raise NotADirectoryError(f'project path {self.project_path} not dir')
        self.project_name = os.path.basename(self.project_path)
        # 先看看本项目是不是一个测试套
        if is_testsuite(self.project_path):
            self.testsuites.append(TestSuite(self.project_path))
            self.is_project_one_suite = True
        else:
            for now_dir, dirs, files in os.walk(self.project_path):
                for dir_name in dirs:
                    dir_path = os.path.join(now_dir, dir_name)
                    if not is_testsuite(dir_path):
                        continue
                    self.testsuites.append(TestSuite(dir_path))
        if len(self.testsuites) == 0:
            raise ValueError(f'no testsuite in project {self.project_path}')
        for testsuite in self.testsuites:
            if self.is_project_one_suite:
                tsuite = os.path.join('.', self.project_name, 'tsuite')
            else:
                tsuite = os.path.join('.', self.project_name, testsuite.testsuite_name, 'tsuite')
            testsuite.setup_cmd = f'{tsuite} setup'
            testsuite.teardown_cmd = f'{tsuite} teardown'
            for testcase in testsuite.testcases:
                TestCase.check_id_unique(testcase.testcase_attr.get('用例ID'),
                                         testsuite.testsuite_name,
                                         testcase.testcase_attr.get('用例文件'))
                if testcase.testcase_attr['自动化'] == '1':
                    testcase.testcase_attr['执行命令'] = f'{tsuite} run {testcase.testcase_attr["用例文件"]}'
                else:
                    testcase.testcase_attr['执行命令'] = ''

    def export_zhiyan(self):
        import openpyxl
        wb = openpyxl.Workbook()
        ws = wb.active
        ws.title = '用例'
        columns = ['用例目录', '用例名称', '描述', '等级', '用例类型', '前置条件', '步骤描述类型', '步骤', '预期结果',
                   '是否自动化', '标签', '关联tapd需求', '用例路径', '用例执行参数', '过滤标签']
        nr_columns = len(columns)
        row = 1
        for c in range(nr_columns):
            ws.cell(row=row, column=c + 1).value = columns[c]
        for testsuite in self.testsuites:
            for testcase in testsuite.testcases:
                zhiyan_dir = testcase.get_extern_attr('用例目录')
                testcase.testcase_attr['用例目录'] = zhiyan_dir
                if zhiyan_dir is None or len(zhiyan_dir) == 0 or (not zhiyan_dir.startswith('信创项目/7.')):
                    error(testsuite.testsuite_name, testcase.testcase_attr.get('用例文件'),
                          f'用例扩展属性中没有定义[用例目录]，当前内容为：{zhiyan_dir}，正确格式应为：'
                          f'@扩展属性: 用例目录="信创项目/7.x XXXX测试内容/XXXX测试项"，'
                          f'具体要求参考：https://git.woa.com/tlinux/TST/self-suite/tencentos-cert/blob/master/README.md')
                zhiyan_name = testcase.testcase_attr.get("用例名称")
                if zhiyan_name is None or len(zhiyan_name) == 0 or re.match(r'^\d+[-_]*', zhiyan_name) is None:
                    error(testsuite.testsuite_name, testcase.testcase_attr.get('用例文件'),
                          f'用例名称不符合信创用例要求，当前用例名称为：{zhiyan_name}，'
                          f'信创用例名称要求格式为：@用例名称: 序号_用例名，需要参考表格中的A列，'
                          f'具体要求参考：https://git.woa.com/tlinux/TST/self-suite/tencentos-cert/blob/master/README.md')
                row += 1
                for c in range(nr_columns):
                    zhiyan = Zhiyan(testcase)
                    ws.cell(row=row, column=c + 1).value = zhiyan.testcase_info[columns[c]]
        wb.save(f'{self.project_name}.xlsx')

    def suite_extract(self, testcase):
        suite_cases = list()
        tcase = Tcase(testcase)
        if is_testsuite(self.project_path):
            tsuite = os.path.join('.', self.project_name, 'tsuite')
            testsuite_path = self.project_path
        else:
            tsuite = os.path.join('.', self.project_name, testcase.testsuite_name, 'tsuite')
            testsuite_path = os.path.join(self.project_path, testcase.testsuite_name)
        
        tcase.testcase_info['name'] = testcase.testsuite_name
        tcase.testcase_info['is_template_case'] = 2
        tcase.testcase_info['test_step'] = ''
        tcase.testcase_info['expect_result'] = ''
        tcase.testcase_info['signs'] = ''
        tcase.testcase_info['directory'] = ''
        tcase.testcase_info['git_path'] = self.git_root
        tcase.testcase_info['run_cmd'] = ''
        tcase.testcase_info['u_id'] = ''
        tcase.testcase_info['auto'] = ''
        tcase.testcase_info['timeout'] = ''
        tcase.testcase_info['series'] = ''

        create_cmd = Command(f"cd {self.project_path} && git log --reverse --format='%an' {testsuite_path} | head -n 1")
        create_cmd.run_quiet()
        tcase.testcase_info['create_person'] = create_cmd.output_stdout.rstrip('\n')

        update_cmd = Command(f"cd {self.project_path} && git log -n 1 --format='%an' {testsuite_path}")
        update_cmd.run_quiet()
        tcase.testcase_info['update_person'] = update_cmd.output_stdout.rstrip('\n')

        tcase.testcase_info['setup_cmd'] = f'{tsuite} setup'
        tcase.testcase_info['teardown_cmd'] = f'{tsuite} teardown'

        suite_cases.append(tcase.testcase_info)
        return suite_cases

    def case_extract(self, testcase):
        suite_cases = list()
        tcase = Tcase(testcase)
        if is_testsuite(self.project_path):
            testcase_path = os.path.join(".", 'testcase', testcase.testcase_file)
            tsuite = os.path.join('.', self.project_name, 'tsuite')
        else:
            testcase_path = os.path.join(".", testcase.testsuite_name, 'testcase', testcase.testcase_file)
            tsuite = os.path.join('.', self.project_name, testcase.testsuite_name, 'tsuite')

        case_path = testcase.testcase_attr['用例文件']
        testcase_name = os.path.join(testcase.testsuite_name, case_path).replace("/", "_")
        tcase.testcase_info['name'] = ".".join(testcase_name.split(".")[:-1])                        
        tcase.testcase_info['directory'] = testcase.testsuite_name
        tcase.testcase_info['git_path'] = self.git_root
        
        create_cmd = Command(f"cd {self.project_path} && git log --reverse --format='%an' {testcase_path} | head -n 1")
        create_cmd.run_quiet()
        tcase.testcase_info['create_person'] = create_cmd.output_stdout.rstrip('\n')

        update_cmd = Command(f"cd {self.project_path} && git log -n 1 --format='%an' {testcase_path}")
        update_cmd.run_quiet()
        tcase.testcase_info['update_person'] = update_cmd.output_stdout.rstrip('\n')

        suite_cases.append(tcase.testcase_info)
        return suite_cases

    def export_tcase(self):
        all_case = list()
        for testsuite in self.testsuites:
            for testcase in testsuite.testcases:
                case0 = self.suite_extract(testcase)
                if case0:
                    all_case += case0
                break

        for testsuite in self.testsuites:
            for testcase in testsuite.testcases:
                tcase = Tcase(testcase)
                cases = self.case_extract(testcase)
                if cases:
                    all_case += cases

        print(json.dumps(all_case, indent=4, ensure_ascii=False))
        with open("tcase.json", "w") as f:
           json.dump(all_case, f, indent=4, ensure_ascii=False)


class Zhiyan:
    def __init__(self, testcase: TestCase):
        self.testcase_info = {
            '用例目录': testcase.testcase_attr['用例目录'],
            '用例名称': testcase.testcase_attr['用例名称'],
            '描述': testcase.testcase_attr['用例描述'],
            '等级': f"P{testcase.testcase_attr['用例级别']}",
            '用例类型': testcase.testcase_attr['用例类型'],
            '前置条件': testcase.testcase_attr['预置条件'],
            '步骤描述类型': '文本',
            '步骤': testcase.testcase_attr['测试步骤'],
            '预期结果': testcase.testcase_attr['预期结果'],
            '是否自动化': '已自动化' if testcase.testcase_attr['自动化'] == '1' else '待自动化',
            '标签': testcase.testcase_attr['用例标签'],
            '关联tapd需求': '',
            '用例路径': testcase.testcase_attr['用例文件'],
            '用例执行参数': testcase.testcase_attr['执行命令'],
            '过滤标签': ''
        }

class Tcase:
    def __init__(self, testcase: TestCase):
        self.testcase_info = {
            'name': '',
            'is_template_case': 0,
            'test_scene': '',
            'test_step': testcase.testcase_attr['测试步骤'],
            'expect_result': testcase.testcase_attr['预期结果'],
            'note': '',
            'signs': testcase.testcase_attr['用例标签'],
            'directory': '',
            'level': int(testcase.testcase_attr['用例级别']),
            'git_path': '',
            'run_cmd': testcase.testcase_attr['执行命令'],
            'tapd_url': '',
            'create_person': '',
            'update_person': '',
            'timeout': 0,
            'auto': 1 if testcase.testcase_attr['自动化'] == '1' else 0,
            'u_id': testcase.testcase_attr['用例ID'],
            'series': 0,
            'setup_cmd': '',
            'teardown_cmd': ''
        }

if __name__ == '__main__':
    project = TestProject(os.path.dirname(os.path.abspath(__file__)))
    if len(sys.argv) == 1 or sys.argv[1] == 'tcase':
        project.export_tcase()
    elif sys.argv[1] == 'zhiyan':
        project.export_zhiyan()
    if g_error_count == 0:
        exit(0)
    else:
        exit(1)
